/*
 * Unidata Platform
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 *
 * Commercial License
 * This version of Unidata Platform is licensed commercially and is the appropriate option for the vast majority of use cases.
 *
 * Please see the Unidata Licensing page at: https://unidata-platform.com/license/
 * For clarification or additional options, please contact: info@unidata-platform.com
 * -------
 * Disclaimer:
 * -------
 * THIS SOFTWARE IS DISTRIBUTED "AS-IS" WITHOUT ANY WARRANTIES, CONDITIONS AND
 * REPRESENTATIONS WHETHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE
 * IMPLIED WARRANTIES AND CONDITIONS OF MERCHANTABILITY, MERCHANTABLE QUALITY,
 * FITNESS FOR A PARTICULAR PURPOSE, DURABILITY, NON-INFRINGEMENT, PERFORMANCE AND
 * THOSE ARISING BY STATUTE OR FROM CUSTOM OR USAGE OF TRADE OR COURSE OF DEALING.
 */
package org.unidata.mdm.job.reindex.service.impl;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.unidata.mdm.core.dao.vendor.VendorDataType;
import org.unidata.mdm.core.dao.vendor.VendorUtils;
import org.unidata.mdm.core.dao.vendor.VendorUtils.CopyDataOutputStream;
import org.unidata.mdm.core.service.job.JobCommonParameters;
import org.unidata.mdm.core.type.job.ModularBatchJobSupport;
import org.unidata.mdm.core.type.job.ModularBatchJobWriter;
import org.unidata.mdm.data.context.UpsertRequestContext;
import org.unidata.mdm.data.dao.StorageDAO;
import org.unidata.mdm.data.service.segments.records.batch.RecordsUpsertFinishExecutor;
import org.unidata.mdm.data.service.segments.records.batch.RecordsUpsertPersistenceExecutor;
import org.unidata.mdm.data.service.segments.records.batch.RecordsUpsertProcessExecutor;
import org.unidata.mdm.data.service.segments.records.batch.RecordsUpsertStartExecutor;
import org.unidata.mdm.data.service.segments.records.upsert.RecordUpsertFinishExecutor;
import org.unidata.mdm.data.service.segments.records.upsert.RecordUpsertIndexingExecutor;
import org.unidata.mdm.data.service.segments.records.upsert.RecordUpsertPersistenceExecutor;
import org.unidata.mdm.data.service.segments.records.upsert.RecordUpsertStartExecutor;
import org.unidata.mdm.data.service.segments.records.upsert.RecordUpsertTimelineExecutor;
import org.unidata.mdm.data.type.apply.batch.impl.RecordUpsertBatchSetAccumulator;
import org.unidata.mdm.job.reindex.configuration.ReindexJobConfigurationConstants;
import org.unidata.mdm.search.exception.SearchApplicationException;
import org.unidata.mdm.search.exception.SearchExceptionIds;
import org.unidata.mdm.system.service.ExecutionService;
import org.unidata.mdm.system.service.PipelineService;
import org.unidata.mdm.system.type.pipeline.Pipeline;
import org.unidata.mdm.system.type.pipeline.connection.PipelineConnection;

/**
 * @author Denis Kostovarov
 */
@Component
@StepScope
public class ReindexDataJobDataItemWriter extends ModularBatchJobWriter<UpsertRequestContext> implements InitializingBean {
    /**
     * Logger
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(ReindexDataJobDataItemWriter.class);
    /**
     * Max number of attempts to try to submit a commit interval.
     */
    private static final int MAX_ATTEMPTS = 3;
    /**
     * If true, ids will be logged to a table, if some shards fail.
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_WRITE_ID_LOG + "] ?: false}")
    private boolean writeIdLog;
    /**
     * Parameter used to set createDate for all imported items.
     */
    @Value("#{jobParameters[" + JobCommonParameters.PARAM_START_TIMESTAMP + "]}")
    private String jobStartTimestamp;
    /**
     * Job operation id
     */
    @Value("#{jobParameters[" + JobCommonParameters.PARAM_OPERATION_ID + "]}")
    private String operationId;
    /**
     * If true, record's data will be reindexed
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_REINDEX_RECORDS + "] ?: false}")
    private boolean jobReindexRecords;
    /**
     * If true, record's data will be reindexed
     */
    @Value("#{stepExecutionContext[" + ReindexJobConfigurationConstants.PARAM_REINDEX_RECORDS + "] ?: false}")
    private boolean stepReindexRecords;
    /**
     * The commit interval.
     */
    @Value("${" + ReindexJobConfigurationConstants.PROP_NAME_DATA_COMMIT_INTERVAL + ":1000}")
    private int commitInterval;

    @Autowired
    private ModularBatchJobSupport support;
    /**
     * PLS.
     */
    @Autowired
    private PipelineService pipelineService;
    /**
     * EXS.
     */
    @Autowired
    private ExecutionService executionService;
    /**
     * Storeage DAO for id logging.
     */
    @Autowired
    private StorageDAO dataStorageDAO;
    /**
     * Strt of chunk writing.
     */
    private Timestamp ts;
    /**
     * Accumulator for the writer's timespan.
     */
    private RecordUpsertBatchSetAccumulator accumulator;
    /**
     * BULK pipeline for the writer's timespan.
     */
    private Pipeline bulk;

    @SuppressWarnings("unchecked")
    @Override
    public void write(List<? extends UpsertRequestContext> items) throws Exception {

        if (CollectionUtils.isEmpty(items)) {
            return;
        }

        // 1. Charge
        accumulator.charge((List<UpsertRequestContext>) items);

        // 2. Run possibly existing support
        support.forEach(pp -> pp.accumulate(getStepName(), getStepExecution(), accumulator));

        int attempt = 1;
        while (true) {

            try {
                executionService.execute(bulk, accumulator);
            } catch (SearchApplicationException sae) {
                LOGGER.error("{} try error while indexing records.", attempt, sae);
                if (isReject(sae)) {

                    if (attempt >= MAX_ATTEMPTS) {
                        writeToIdLog((List<UpsertRequestContext>) items);
                    } else {
                        Thread.sleep(1000L * attempt);
                        attempt++;
                        continue;
                    }
                }
            } catch (Exception e) {
                LOGGER.error("Error while indexing records.", e);
            }

            break;
        }
    }

    private boolean isReject(SearchApplicationException sae) {

        if (!writeIdLog) {
            return false;
        }

        Object supplied = sae.getResponse();
        if (sae.getId() != SearchExceptionIds.EX_INDEXING_EXCEPTION || !(supplied instanceof BulkResponse)) {
            LOGGER.info("Exception is not an RTE or no valid bulk response object supplied. Skip further processing steps.");
            return false;
        }

        BulkResponse response = (BulkResponse) supplied;
        if (!response.hasFailures()) {
            LOGGER.info("Response has no failures. Skip further processing steps.");
            return false;
        }

        for(int i = 0; i < response.getItems().length; i++) {

            BulkItemResponse bir = response.getItems()[i];
            if (!bir.isFailed()) {
                continue;
            }

            Failure f = bir.getFailure();
            Throwable t = f.getCause();
            while (t != null) {

                if (t instanceof EsRejectedExecutionException) {
                    return true;
                }

                t = t.getCause();
            }
        }

        LOGGER.info("No rejection failures detected. Skip further processing steps.");
        return false;
    }

    private void writeToIdLog(List<UpsertRequestContext> items) {

        LOGGER.info("Shard rejections detected and retry count reached. Id log will be written.");

        List<Pair<String, String>> ids = items.stream()
            .map(i -> Pair.of(i.getEtalonKey(), i.getEntityName()))
            .collect(Collectors.toList());

        if (CollectionUtils.isEmpty(ids)) {
            return;
        }

        String tableName = "reindex_id_log_" + StringUtils.replace(operationId, "-", "_");
        final String prolog = new StringBuilder().append("copy ")
                .append(tableName)
                .append(" (etalon_id, name, operation_id, ts) from stdin binary")
                .toString();

        final VendorDataType[] types = {
            VendorDataType.UUID,
            VendorDataType.CHAR,
            VendorDataType.CHAR,
            VendorDataType.TIMESTAMP
        };

        final Object[] params = new Object[types.length];
        try (Connection connection = dataStorageDAO.getBareConnection();
             CopyDataOutputStream stream = VendorUtils.bulkStart(connection, prolog)) {

            for (Pair<String, String> i : ids) {

                params[0] = UUID.fromString(i.getLeft());
                params[1] = i.getRight();
                params[2] = operationId;
                params[3] = ts;

                VendorUtils.bulkAppend(stream, types, params);
            }

            VendorUtils.bulkFinish(stream);
        } catch (SQLException e) {
            LOGGER.error("SQL exception caught!", e);
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {

        // 1. Records internal pipeline
        Pipeline p;
        if (stepReindexRecords || jobReindexRecords) {
            p = Pipeline.start(pipelineService.start(RecordUpsertStartExecutor.SEGMENT_ID))
                .with(pipelineService.point(RecordUpsertTimelineExecutor.SEGMENT_ID))
                .with(pipelineService.point(RecordUpsertIndexingExecutor.SEGMENT_ID))
                .with(pipelineService.point(RecordUpsertPersistenceExecutor.SEGMENT_ID))
                .end(pipelineService.finish(RecordUpsertFinishExecutor.SEGMENT_ID));
        } else {
            p = Pipeline.empty();
        }

        // 2. Records accumulator
        accumulator = new RecordUpsertBatchSetAccumulator(commitInterval, false);
        accumulator.setPipeline(p);

        // 3. Bulk pipeline
        bulk = Pipeline.start(pipelineService.start(RecordsUpsertStartExecutor.SEGMENT_ID))
            .with(pipelineService.point(RecordsUpsertProcessExecutor.SEGMENT_ID))
            .with(pipelineService.point(RecordsUpsertPersistenceExecutor.SEGMENT_ID));

        support.forEach(pp -> {

            PipelineConnection connected = pp.connect(getStepName(), getStepExecution());
            if (Objects.isNull(connected)) {
                return;
            }

            bulk.with(connected);
        });

        bulk.end(pipelineService.finish(RecordsUpsertFinishExecutor.SEGMENT_ID));

        // 4. Id log TS
        ts = new Timestamp(Long.parseLong(jobStartTimestamp));
    }
}
